package com.mjf.utils;

import com.mjf.bean.TransientSink;
import com.mjf.common.GmallConfig;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.lang.reflect.Field;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * ClickHouse 工具类(5条数据写出一次)
 */
public class ClickHouseUtil {

    public static <T> SinkFunction<T> getSink(String sql) {

        return JdbcSink.<T>sink(
                sql,
                new JdbcStatementBuilder<T>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, T t) throws SQLException {

                        try {
                            // 获取所有的字段信息
                            Field[] fields = t.getClass().getDeclaredFields();

                            // 遍历字段
                            int offset = 0;
                            for (int i = 0; i < fields.length; i++) {
                                // 获取字段
                                Field field = fields[i];
                                // 设置私有属性可访问
                                field.setAccessible(true);

                                // 获取字段上注解
                                TransientSink annotation = field.getAnnotation(TransientSink.class);
                                if (annotation != null) {
                                    offset++;   // 存在注解时，下标向前移
                                    continue;   // 如果字段上存在注解，则不写出到clickhouse
                                }

                                // 获取值
                                Object value = field.get(t);
                                // 给预编译SQL对象赋值
                                preparedStatement.setObject(i + 1 - offset, value);
                            }
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                        }

                    }
                },
                new JdbcExecutionOptions.Builder()
                        .withBatchSize(5)   // 5条数据写出一次
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName(GmallConfig.CLICKHOUSE_DRIVER)
                        .withUrl(GmallConfig.CLICKHOUSE_URL)
                        .build()
        );

    }

}
